package com;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.netty.channel.group.ChannelGroup;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 转码
 */
@Slf4j
public class TransferRtspHelper {

    private static final ScheduledExecutorService SCHEDULE = Executors.newScheduledThreadPool(2);
    private static final ExecutorService PROCESS_EXECUTOR = Executors.newCachedThreadPool();
    private static Map<String, Transfer> transferMap = new HashMap<>();

    /**
     * 开启
     */
    public static synchronized void open(String ip) throws IOException {
        Transfer transfer = transferMap.get(ip);
        if (transfer == null) {
            transfer = new Transfer();
            transfer.setIp(ip);
            transfer.setStartTime(new Date());
            transfer.setContinueTime(new Date());
            transfer.setCount(1);
            PROCESS_EXECUTOR.submit(new TransferTask(transfer));
            transferMap.put(ip, transfer);
        } else {
            transfer.setContinueTime(new Date());
            transfer.incCount();
        }
        log.info("开始转码：{}", transfer);
    }

    /**
     * 关闭
     */
    public static synchronized void close(String ip) {
        Transfer transfer = transferMap.get(ip);
        if (transfer != null) {
            transfer.decCount();
            if (transfer.getCount() <= 0) {
                Process process = transfer.getProcess();
                if (process != null && process.isAlive()) {
                    process.destroy();
                }
                transferMap.remove(ip);
            }
        }
        log.info("结束转码：{}", transfer);
    }

    /**
     * 若当前有在转码，打印相关信息
     */
    public static void report() {
        Map<String, ChannelGroup> channelGroups = ChannelGroupHolder.getChannelGroups();
        transferMap.forEach((k, v) -> {
            ChannelGroup group = channelGroups.get(k);
            if (group != null && !group.isEmpty()) {
                log.info("当前转码中：{}", v);
            }
        });
    }

    /**
     * 如果没有对应channel，停止转码，如果前端页面还开着，会自动建立回连接
     */
    public static void clear() {
        List<String> toClearIp = new ArrayList<>();
        Map<String, ChannelGroup> channelGroups = ChannelGroupHolder.getChannelGroups();
        transferMap.forEach((k, v) -> {
            ChannelGroup group = channelGroups.get(k);
            if (group == null || group.isEmpty()
                    || System.currentTimeMillis() - v.getContinueTime().getTime() > 1800_000L) {
                toClearIp.add(k);
            }
        });
        if (!toClearIp.isEmpty()) {
            log.info("关闭以下无效转码：{}", toClearIp);
            toClearIp.forEach(TransferRtspHelper::close);
        }
    }

    /**
     * 开启定时器
     */
    public static void schedule() {
        SCHEDULE.scheduleAtFixedRate(TransferRtspHelper::report, 1L, 5L, TimeUnit.MINUTES);
        SCHEDULE.scheduleAtFixedRate(TransferRtspHelper::clear, 60L, 5L, TimeUnit.SECONDS);
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private static class Transfer {
        private String ip;
        private Process process;
        private int count;
        private Date startTime;
        private Date continueTime;

        public void incCount() {
            count++;
        }

        public void decCount() {
            count--;
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private static class TransferTask implements Runnable {

        private Transfer transfer;

        @SneakyThrows
        @Override
        public void run() {
            String ip = transfer.getIp();
            String command = "ffmpeg -f rtsp -rtsp_transport tcp -i " + ip
                    + " -strict -2 -c:v libx264 -vsync 2 -c:a aac "
                    + "-f mpegts -codec:v mpeg1video -s 640x480 -b:v 1000k -bf 0 http://127.0.0.1:" + MonitorServer.PORT
                    + "/push/" + ip;
            log.info(command);
            Process process = Runtime.getRuntime().exec(command);
            transfer.setProcess(process);
        }
    }

}
